package com.houbank.demo.rabbitmq;


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import org.springframework.web.filter.CharacterEncodingFilter;

@EnableRabbit
@Configuration
public class AmqpConfig {
    public static final String EXCHANGE = "spring.boot.direct"; //交换机名称
    public static final String ROUTINGKEY_FAIL = "spring.boot.routingKey.failure"; //失败路由关键字
    public static final String ROUTINGKEY = "spring.boot.routingKey"; //成功路由关键字
    public static final String QUEUE_NAME = "spring.demo"; //成功队列名称
    public static final String QUEUE_NAME_FAIL = "spring.demo.failure"; //失败队列名称

    //RabbitMQ的配置信息
    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.port}")
    private Integer port;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    /*
    ====================================消息生产者===========================================================
     */

    /**
     * ConnectionFactory配置 （生产者消费者都必须配置）
     * 建立一个连接容器，类似数据库的连接池
     * @return
     */
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory =
                new CachingConnectionFactory(host, port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setPublisherConfirms(true);// 确认机制
        /*
        *发布确认，template要求CachingConnectionFactory的publisherConfirms属性设置为true
        *这里需要显示调用
        *connectionFactory.setPublisherConfirms(true);
        * 才能进行消息的回调。
        */
        return connectionFactory;
    }

    /**
     * RabbitMQ的使用入口
      */
    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    //必须是prototype类型
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(this.connectionFactory());
        template.setMessageConverter(this.jsonMessageConverter());
        template.setMandatory(true);
        return template;
    }
/*
 ============================================ 消息消费者===============================================================
   消费者负责申明交换机(生产者也可以申明)、队列、两者的绑定操作。
 */
    /**
     * 交换机
     * 针对消费者配置
     * FanoutExchange: 将消息分发到所有的绑定队列，无routingkey的概念
     * HeadersExchange ：通过添加属性key-value匹配
     * DirectExchange: 按照routingkey分发到指定队列
     * TopicExchange: 多关键字匹配
     */
    @Bean
    public DirectExchange exchange() {
        return new DirectExchange(EXCHANGE);
    }

    /**
     * 成功队列
     *
     * @return
     */
    @Bean
    public Queue queue() {
        return new Queue(QUEUE_NAME, true); //队列持久

    }

    /**
     * 失败队列
     * @return
     */
    @Bean
    public Queue queueFail() {
        return new Queue(QUEUE_NAME_FAIL, true); //队列持久

    }
    /**
     * 队列和对应的交换机进行绑定
     *
     * @return
     */
    @Bean
    public Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue()).to(exchange()).with(AmqpConfig.ROUTINGKEY);
    }
    /**
     * 队列和对应的交换机进行绑定
     * @return
     */
    @Bean
    public Binding bindingFail(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queueFail()).to(exchange()).with(AmqpConfig.ROUTINGKEY_FAIL);
    }


    //spring amqp默认的是jackson 的一个插件,
    // 目的将生产者生产的数据转换为json存入消息队列，由于fastjson的速度快于jackson,这里替换为fastjson的一个实现
    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
       // return new FastJsonMessageConverter(); 无效
    }
//===============================rabbitamq消息消费手动控制======================================================================
   /* @Bean
    Receiver receiver(){
        return new Receiver();
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver, "onMessage");
    }
    *//**
     * 声明一个监听容器
     *
     * @return
     *//*
    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        container.setQueueNames(AmqpConfig.QUEUE_NAME);
        container.setExposeListenerChannel(true);
        container.setMaxConcurrentConsumers(1);
        container.setConcurrentConsumers(1);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认 消息消费后可以执行回调函数
        container.setMessageListener(listenerAdapter);
        return container;
    }

*/

    @Bean
    public CharacterEncodingFilter characterEncodingFilter() {
        CharacterEncodingFilter filter = new CharacterEncodingFilter();
        filter.setEncoding("UTF-8");
        filter.setForceEncoding(true);
        return filter;
    }

}
